package me.seu.demo.service.emqx;

import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;

/**
 * mqtt client
 *
 * @author liangfeihu
 * @number 53669
 * @since 2021/4/28 下午4:25
 */
public class PublishServer {

    public static void main(String[] args) {
        String pubTopic = "device_config";

        String content = "Hello MQTT World ";
        int qos = 2;
        String broker = "tcp://127.0.0.1:1883";
        String clientId = "emqx_test_java";

        MemoryPersistence persistence = new MemoryPersistence();
        try {
            MqttClient client = new MqttClient(broker, clientId, persistence);

            // MQTT 连接选项
            MqttConnectOptions connOpts = new MqttConnectOptions();
            connOpts.setUserName("emqx_test");
            connOpts.setPassword("emqx_test_pwd".toCharArray());
            // 保留会话
            connOpts.setCleanSession(true);

            // 设置超时时间
            connOpts.setConnectionTimeout(10);
            // 设置会话心跳时间
            connOpts.setKeepAliveInterval(20);

            // 设置回调
            client.setCallback(new OnMessageCallback());

            // 建立连接
            System.out.println("Connecting to broker: " + broker);
            client.connect(connOpts);
            System.out.println("Connected");

            for (int i = 0; i < 5; i++) {
                content = "Hello MQTT Msg " + i;
                // 消息发布所需参数
                MqttMessage message = new MqttMessage(content.getBytes());
                message.setQos(qos);
                client.publish(pubTopic, message);
                System.out.println("Message " + i + " published ");
                Thread.sleep(1 * 1000);
                System.out.println("--------------------------------------");
            }

            MqttTopic topicPub = client.getTopic(pubTopic);
            for (int i = 0; i < 5; i++) {
                content = "Hello MQTT topicPub Msg " + i;
                // 消息发布所需参数
                MqttMessage message = new MqttMessage(content.getBytes());
                message.setQos(qos);
                message.setRetained(true);

                MqttDeliveryToken token = topicPub.publish(message);
                token.waitForCompletion();
                //System.out.println(token.isComplete() + "========");
                System.out.println("topicPub Message " + i + " published ");

                Thread.sleep(1 * 1000);

                System.out.println("--------------------------------------");
            }

            Thread.sleep(5 * 60 * 1000);

            client.disconnect();
            System.out.println("Disconnected");
            client.close();
            System.exit(0);
        } catch (MqttException me) {
            System.out.println("reason " + me.getReasonCode());
            System.out.println("msg " + me.getMessage());
            System.out.println("loc " + me.getLocalizedMessage());
            System.out.println("cause " + me.getCause());
            System.out.println("excep " + me);
            me.printStackTrace();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

}

